package com.opencee.cloud.msg.mq;

import com.alibaba.fastjson.JSONObject;
import com.opencee.cloud.msg.api.constatns.TopicType;
import com.opencee.cloud.msg.api.constatns.MsgConstants;
import com.opencee.cloud.msg.websocket.WebSocketEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

/**
 * 集群环境下实现websocket收发消息
 *
 * @author liuyadu
 */
@Slf4j
@Configuration
public class TopicWebsocketListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${ws.queue}", autoDelete = "true"),
            exchange = @Exchange(value = MsgConstants.TOPIC_EXCHANGE, type = ExchangeTypes.TOPIC),
            key = "msg.topic.msg.websocket.#"
    ))
    public void onMessage(Message message) {
        try {
            String receivedMsg = new String(message.getBody(), "UTF-8");
            log.debug("websocket接收:{}", message);
            JSONObject delayedMessage = JSONObject.parseObject(receivedMsg);
            JSONObject msg = delayedMessage.getJSONObject("body");
            String action = delayedMessage.getString("action");
            if (TopicType.WS_SEND.getAction().equals(action)) {
                WebSocketEndpoint.sendMessage(msg.toJSONString(), msg.getString("receiverId"));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
